2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.ForwardIfReady

This commit is contained in:
Ken Hibino 2021-02-22 06:54:20 -08:00
parent b156653243
commit 26caccbefd
6 changed files with 26 additions and 18 deletions

View File

@ -69,7 +69,7 @@ func (f *forwarder) start(wg *sync.WaitGroup) {
} }
func (f *forwarder) exec() { func (f *forwarder) exec() {
if err := f.broker.CheckAndEnqueue(f.queues...); err != nil { if err := f.broker.ForwardIfReady(f.queues...); err != nil {
f.logger.Errorf("Could not enqueue scheduled tasks: %v", err) f.logger.Errorf("Could not enqueue scheduled tasks: %v", err)
} }
} }

View File

@ -279,9 +279,12 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri
} }
func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) { func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) {
data := MustMarshalSlice(tb, msgs) for _, msg := range msgs {
for _, s := range data { encoded := MustMarshal(tb, msg)
if err := c.LPush(key, s).Err(); err != nil { if err := c.LPush(key, msg.ID.String()).Err(); err != nil {
tb.Fatal(err)
}
if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)
} }
} }
@ -289,10 +292,15 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b
func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) { func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) {
for _, item := range items { for _, item := range items {
z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)} msg := item.Message
encoded := MustMarshal(tb, msg)
z := &redis.Z{Member: msg.ID.String(), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil { if err := c.ZAdd(key, z).Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)
} }
if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil {
tb.Fatal(err)
}
} }
} }

View File

@ -385,7 +385,7 @@ type Broker interface {
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
Archive(msg *TaskMessage, errMsg string) error Archive(msg *TaskMessage, errMsg string) error
CheckAndEnqueue(qnames ...string) error ForwardIfReady(qnames ...string) error
ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
ClearServerState(host string, pid int, serverID string) error ClearServerState(host string, pid int, serverID string) error

View File

@ -495,9 +495,9 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err() msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err()
} }
// CheckAndEnqueue checks for scheduled/retry tasks for the given queues // ForwardIfReady checks scheduled and retry sets of the given queues
//and enqueues any tasks that are ready to be processed. // and move any tasks that are ready to be processed to the pending set.
func (r *RDB) CheckAndEnqueue(qnames ...string) error { func (r *RDB) ForwardIfReady(qnames ...string) error {
for _, qname := range qnames { for _, qname := range qnames {
if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil { if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil {
return err return err
@ -514,12 +514,12 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error {
// ARGV[1] -> current unix time // ARGV[1] -> current unix time
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short. // Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(` var forwardCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do for _, id in ipairs(ids) do
redis.call("LPUSH", KEYS[2], msg) redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], msg) redis.call("ZREM", KEYS[1], id)
end end
return table.getn(msgs)`) return table.getn(ids)`)
// forward moves tasks with a score less than the current unix time // forward moves tasks with a score less than the current unix time
// from the src zset to the dst list. It returns the number of tasks moved. // from the src zset to the dst list. It returns the number of tasks moved.

View File

@ -1221,7 +1221,7 @@ func TestArchive(t *testing.T) {
} }
} }
func TestCheckAndEnqueue(t *testing.T) { func TestForwardIfReady(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
@ -1338,7 +1338,7 @@ func TestCheckAndEnqueue(t *testing.T) {
h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
err := r.CheckAndEnqueue(tc.qnames...) err := r.ForwardIfReady(tc.qnames...)
if err != nil { if err != nil {
t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err) t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err)
continue continue

View File

@ -126,13 +126,13 @@ func (tb *TestBroker) Archive(msg *base.TaskMessage, errMsg string) error {
return tb.real.Archive(msg, errMsg) return tb.real.Archive(msg, errMsg)
} }
func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { func (tb *TestBroker) ForwardIfReady(qnames ...string) error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {
return errRedisDown return errRedisDown
} }
return tb.real.CheckAndEnqueue(qnames...) return tb.real.ForwardIfReady(qnames...)
} }
func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {