From aa2676bb57c4c7e488e2dfd2605cd9b7dc0f8c27 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 17 Aug 2020 20:44:41 -0700 Subject: [PATCH] Update Broker interface --- internal/base/base.go | 4 ++-- internal/testbroker/testbroker.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/base/base.go b/internal/base/base.go index 2c0ad81..b4393e7 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -325,8 +325,8 @@ type Broker interface { ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Kill(msg *TaskMessage, errMsg string) error - CheckAndEnqueue() error - ListDeadlineExceeded(deadline time.Time) ([]*TaskMessage, error) + CheckAndEnqueue(qnames ...string) error + ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error) WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index e6bc725..c74679e 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -126,22 +126,22 @@ func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error { return tb.real.Kill(msg, errMsg) } -func (tb *TestBroker) CheckAndEnqueue() error { +func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return errRedisDown } - return tb.real.CheckAndEnqueue() + return tb.real.CheckAndEnqueue(qnames...) } -func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { +func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return nil, errRedisDown } - return tb.real.ListDeadlineExceeded(deadline) + return tb.real.ListDeadlineExceeded(deadline, qnames...) } func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {