2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Define broker interface

This commit is contained in:
Ken Hibino 2020-04-17 06:56:44 -07:00
parent 42453280f4
commit f8a94fb839
7 changed files with 58 additions and 38 deletions

View File

@ -7,8 +7,10 @@ package asynq
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
) )
// Task represents a unit of work to be performed. // Task represents a unit of work to be performed.
@ -30,6 +32,28 @@ func NewTask(typename string, payload map[string]interface{}) *Task {
} }
} }
// broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.
type broker interface {
Enqueue(msg *base.TaskMessage) error
EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*base.TaskMessage, error)
Done(msg *base.TaskMessage) error
Requeue(msg *base.TaskMessage) error
Schedule(msg *base.TaskMessage, processAt time.Time) error
ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *base.TaskMessage, errMsg string) error
RequeueAll() (int64, error)
CheckAndEnqueue(qnames ...string) error
WriteServerState(ss *base.ServerState, ttl time.Duration) error
ClearServerState(ss *base.ServerState) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
PublishCancelation(id string) error
Close() error
}
// RedisConnOpt is a discriminated union of types that represent Redis connection configuration option. // RedisConnOpt is a discriminated union of types that represent Redis connection configuration option.
// //
// RedisConnOpt represents a sum of following types: // RedisConnOpt represents a sum of following types:

View File

@ -9,14 +9,13 @@ import (
"time" "time"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
) )
// heartbeater is responsible for writing process info to redis periodically to // heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up. // indicate that the background worker process is up.
type heartbeater struct { type heartbeater struct {
logger Logger logger Logger
rdb *rdb.RDB broker broker
ss *base.ServerState ss *base.ServerState
@ -27,10 +26,10 @@ type heartbeater struct {
interval time.Duration interval time.Duration
} }
func newHeartbeater(l Logger, rdb *rdb.RDB, ss *base.ServerState, interval time.Duration) *heartbeater { func newHeartbeater(l Logger, b broker, ss *base.ServerState, interval time.Duration) *heartbeater {
return &heartbeater{ return &heartbeater{
logger: l, logger: l,
rdb: rdb, broker: b,
ss: ss, ss: ss,
done: make(chan struct{}), done: make(chan struct{}),
interval: interval, interval: interval,
@ -53,7 +52,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
for { for {
select { select {
case <-h.done: case <-h.done:
h.rdb.ClearServerState(h.ss) h.broker.ClearServerState(h.ss)
h.logger.Info("Heartbeater done") h.logger.Info("Heartbeater done")
return return
case <-time.After(h.interval): case <-time.After(h.interval):
@ -66,7 +65,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
func (h *heartbeater) beat() { func (h *heartbeater) beat() {
// Note: Set TTL to be long enough so that it won't expire before we write again // Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed. // and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteServerState(h.ss, h.interval*2) err := h.broker.WriteServerState(h.ss, h.interval*2)
if err != nil { if err != nil {
h.logger.Error("could not write heartbeat data: %v", err) h.logger.Error("could not write heartbeat data: %v", err)
} }

View File

@ -19,7 +19,7 @@ import (
type processor struct { type processor struct {
logger Logger logger Logger
rdb *rdb.RDB broker broker
ss *base.ServerState ss *base.ServerState
@ -65,7 +65,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration
type newProcessorParams struct { type newProcessorParams struct {
logger Logger logger Logger
rdb *rdb.RDB broker broker
ss *base.ServerState ss *base.ServerState
retryDelayFunc retryDelayFunc retryDelayFunc retryDelayFunc
syncCh chan<- *syncRequest syncCh chan<- *syncRequest
@ -84,7 +84,7 @@ func newProcessor(params newProcessorParams) *processor {
} }
return &processor{ return &processor{
logger: params.logger, logger: params.logger,
rdb: params.rdb, broker: params.broker,
ss: params.ss, ss: params.ss,
queueConfig: qcfg, queueConfig: qcfg,
orderedQueues: orderedQueues, orderedQueues: orderedQueues,
@ -157,8 +157,8 @@ func (p *processor) start(wg *sync.WaitGroup) {
// process the task. // process the task.
func (p *processor) exec() { func (p *processor) exec() {
qnames := p.queues() qnames := p.queues()
msg, err := p.rdb.Dequeue(qnames...) msg, err := p.broker.Dequeue(qnames...)
if err == rdb.ErrNoProcessableTask { if err == rdb.ErrNoProcessableTask { // TODO: Need to decouple this error from rdb to support other brokers
// queues are empty, this is a normal behavior. // queues are empty, this is a normal behavior.
if len(p.queueConfig) > 1 { if len(p.queueConfig) > 1 {
// sleep to avoid slamming redis and let scheduler move tasks into queues. // sleep to avoid slamming redis and let scheduler move tasks into queues.
@ -227,7 +227,7 @@ func (p *processor) exec() {
// restore moves all tasks from "in-progress" back to queue // restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks. // to restore all unfinished tasks.
func (p *processor) restore() { func (p *processor) restore() {
n, err := p.rdb.RequeueAll() n, err := p.broker.RequeueAll()
if err != nil { if err != nil {
p.logger.Error("Could not restore unfinished tasks: %v", err) p.logger.Error("Could not restore unfinished tasks: %v", err)
} }
@ -237,20 +237,20 @@ func (p *processor) restore() {
} }
func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) requeue(msg *base.TaskMessage) {
err := p.rdb.Requeue(msg) err := p.broker.Requeue(msg)
if err != nil { if err != nil {
p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err) p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err)
} }
} }
func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg) err := p.broker.Done(msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
p.logger.Warn("%s; Will retry syncing", errMsg) p.logger.Warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Done(msg) return p.broker.Done(msg)
}, },
errMsg: errMsg, errMsg: errMsg,
} }
@ -260,13 +260,13 @@ func (p *processor) markAsDone(msg *base.TaskMessage) {
func (p *processor) retry(msg *base.TaskMessage, e error) { func (p *processor) retry(msg *base.TaskMessage, e error) {
d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload)) d := p.retryDelayFunc(msg.Retried, e, NewTask(msg.Type, msg.Payload))
retryAt := time.Now().Add(d) retryAt := time.Now().Add(d)
err := p.rdb.Retry(msg, retryAt, e.Error()) err := p.broker.Retry(msg, retryAt, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
p.logger.Warn("%s; Will retry syncing", errMsg) p.logger.Warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error()) return p.broker.Retry(msg, retryAt, e.Error())
}, },
errMsg: errMsg, errMsg: errMsg,
} }
@ -275,13 +275,13 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
func (p *processor) kill(msg *base.TaskMessage, e error) { func (p *processor) kill(msg *base.TaskMessage, e error) {
p.logger.Warn("Retry exhausted for task id=%s", msg.ID) p.logger.Warn("Retry exhausted for task id=%s", msg.ID)
err := p.rdb.Kill(msg, e.Error()) err := p.broker.Kill(msg, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
p.logger.Warn("%s; Will retry syncing", errMsg) p.logger.Warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {
return p.rdb.Kill(msg, e.Error()) return p.broker.Kill(msg, e.Error())
}, },
errMsg: errMsg, errMsg: errMsg,
} }

View File

@ -71,7 +71,7 @@ func TestProcessorSuccess(t *testing.T) {
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
rdb: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
@ -178,7 +178,7 @@ func TestProcessorRetry(t *testing.T) {
cancelations := base.NewCancelations() cancelations := base.NewCancelations()
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
rdb: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
syncCh: nil, syncCh: nil,
@ -253,7 +253,7 @@ func TestProcessorQueues(t *testing.T) {
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
rdb: nil, broker: nil,
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
@ -330,7 +330,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
rdb: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,

View File

@ -7,13 +7,11 @@ package asynq
import ( import (
"sync" "sync"
"time" "time"
"github.com/hibiken/asynq/internal/rdb"
) )
type scheduler struct { type scheduler struct {
logger Logger logger Logger
rdb *rdb.RDB broker broker
// channel to communicate back to the long running "scheduler" goroutine. // channel to communicate back to the long running "scheduler" goroutine.
done chan struct{} done chan struct{}
@ -25,14 +23,14 @@ type scheduler struct {
qnames []string qnames []string
} }
func newScheduler(l Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler { func newScheduler(l Logger, b broker, avgInterval time.Duration, qcfg map[string]int) *scheduler {
var qnames []string var qnames []string
for q := range qcfg { for q := range qcfg {
qnames = append(qnames, q) qnames = append(qnames, q)
} }
return &scheduler{ return &scheduler{
logger: l, logger: l,
rdb: r, broker: b,
done: make(chan struct{}), done: make(chan struct{}),
avgInterval: avgInterval, avgInterval: avgInterval,
qnames: qnames, qnames: qnames,
@ -63,7 +61,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
} }
func (s *scheduler) exec() { func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil { if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil {
s.logger.Error("Could not enqueue scheduled tasks: %v", err) s.logger.Error("Could not enqueue scheduled tasks: %v", err)
} }
} }

View File

@ -36,7 +36,7 @@ type Server struct {
logger Logger logger Logger
rdb *rdb.RDB broker broker
// wait group to wait for all goroutines to finish. // wait group to wait for all goroutines to finish.
wg sync.WaitGroup wg sync.WaitGroup
@ -208,7 +208,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
subscriber := newSubscriber(logger, rdb, cancels) subscriber := newSubscriber(logger, rdb, cancels)
processor := newProcessor(newProcessorParams{ processor := newProcessor(newProcessorParams{
logger: logger, logger: logger,
rdb: rdb, broker: rdb,
ss: ss, ss: ss,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
syncCh: syncCh, syncCh: syncCh,
@ -219,7 +219,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
return &Server{ return &Server{
ss: ss, ss: ss,
logger: logger, logger: logger,
rdb: rdb, broker: rdb,
scheduler: scheduler, scheduler: scheduler,
processor: processor, processor: processor,
syncer: syncer, syncer: syncer,
@ -330,7 +330,7 @@ func (srv *Server) Stop() {
srv.wg.Wait() srv.wg.Wait()
srv.rdb.Close() srv.broker.Close()
srv.ss.SetStatus(base.StatusStopped) srv.ss.SetStatus(base.StatusStopped)
srv.logger.Info("Bye!") srv.logger.Info("Bye!")

View File

@ -10,12 +10,11 @@ import (
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
) )
type subscriber struct { type subscriber struct {
logger Logger logger Logger
rdb *rdb.RDB broker broker
// channel to communicate back to the long running "subscriber" goroutine. // channel to communicate back to the long running "subscriber" goroutine.
done chan struct{} done chan struct{}
@ -24,10 +23,10 @@ type subscriber struct {
cancelations *base.Cancelations cancelations *base.Cancelations
} }
func newSubscriber(l Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber { func newSubscriber(l Logger, b broker, cancelations *base.Cancelations) *subscriber {
return &subscriber{ return &subscriber{
logger: l, logger: l,
rdb: rdb, broker: b,
done: make(chan struct{}), done: make(chan struct{}),
cancelations: cancelations, cancelations: cancelations,
} }
@ -49,7 +48,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
) )
// Try until successfully connect to Redis. // Try until successfully connect to Redis.
for { for {
pubsub, err = s.rdb.CancelationPubSub() pubsub, err = s.broker.CancelationPubSub()
if err != nil { if err != nil {
s.logger.Error("cannot subscribe to cancelation channel: %v", err) s.logger.Error("cannot subscribe to cancelation channel: %v", err)
select { select {