2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Update Broker interface

This commit is contained in:
Ken Hibino 2020-08-17 20:44:41 -07:00
parent 9348a62691
commit aa2676bb57
2 changed files with 6 additions and 6 deletions

View File

@ -325,8 +325,8 @@ type Broker interface {
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *TaskMessage, errMsg string) error Kill(msg *TaskMessage, errMsg string) error
CheckAndEnqueue() error CheckAndEnqueue(qnames ...string) error
ListDeadlineExceeded(deadline time.Time) ([]*TaskMessage, error) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
ClearServerState(host string, pid int, serverID string) error ClearServerState(host string, pid int, serverID string) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers

View File

@ -126,22 +126,22 @@ func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error {
return tb.real.Kill(msg, errMsg) return tb.real.Kill(msg, errMsg)
} }
func (tb *TestBroker) CheckAndEnqueue() error { func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {
return errRedisDown return errRedisDown
} }
return tb.real.CheckAndEnqueue() return tb.real.CheckAndEnqueue(qnames...)
} }
func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {
return nil, errRedisDown return nil, errRedisDown
} }
return tb.real.ListDeadlineExceeded(deadline) return tb.real.ListDeadlineExceeded(deadline, qnames...)
} }
func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error { func (tb *TestBroker) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {