From 26caccbefd5be86c581076b25a09e04c5ed012c6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 22 Feb 2021 06:54:20 -0800 Subject: [PATCH] Update RDB.ForwardIfReady --- forwarder.go | 2 +- internal/asynqtest/asynqtest.go | 16 ++++++++++++---- internal/base/base.go | 2 +- internal/rdb/rdb.go | 16 ++++++++-------- internal/rdb/rdb_test.go | 4 ++-- internal/testbroker/testbroker.go | 4 ++-- 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/forwarder.go b/forwarder.go index 3b2babf..d7bd1f5 100644 --- a/forwarder.go +++ b/forwarder.go @@ -69,7 +69,7 @@ func (f *forwarder) start(wg *sync.WaitGroup) { } func (f *forwarder) exec() { - if err := f.broker.CheckAndEnqueue(f.queues...); err != nil { + if err := f.broker.ForwardIfReady(f.queues...); err != nil { f.logger.Errorf("Could not enqueue scheduled tasks: %v", err) } } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 061b6ef..23bef4b 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -279,9 +279,12 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri } func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*base.TaskMessage) { - data := MustMarshalSlice(tb, msgs) - for _, s := range data { - if err := c.LPush(key, s).Err(); err != nil { + for _, msg := range msgs { + encoded := MustMarshal(tb, msg) + if err := c.LPush(key, msg.ID.String()).Err(); err != nil { + tb.Fatal(err) + } + if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil { tb.Fatal(err) } } @@ -289,10 +292,15 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []base.Z) { for _, item := range items { - z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)} + msg := item.Message + encoded := MustMarshal(tb, msg) + z := &redis.Z{Member: msg.ID.String(), Score: float64(item.Score)} if err := c.ZAdd(key, z).Err(); err != nil { tb.Fatal(err) } + if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil { + tb.Fatal(err) + } } } diff --git a/internal/base/base.go b/internal/base/base.go index 1a8f7f1..067cc88 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -385,7 +385,7 @@ type Broker interface { ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Archive(msg *TaskMessage, errMsg string) error - CheckAndEnqueue(qnames ...string) error + ForwardIfReady(qnames ...string) error ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error) WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error ClearServerState(host string, pid int, serverID string) error diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 82357ac..d4742e2 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -495,9 +495,9 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err() } -// CheckAndEnqueue checks for scheduled/retry tasks for the given queues -//and enqueues any tasks that are ready to be processed. -func (r *RDB) CheckAndEnqueue(qnames ...string) error { +// ForwardIfReady checks scheduled and retry sets of the given queues +// and move any tasks that are ready to be processed to the pending set. +func (r *RDB) ForwardIfReady(qnames ...string) error { for _, qname := range qnames { if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil { return err @@ -514,12 +514,12 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error { // ARGV[1] -> current unix time // Note: Script moves tasks up to 100 at a time to keep the runtime of script short. var forwardCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) -for _, msg in ipairs(msgs) do - redis.call("LPUSH", KEYS[2], msg) - redis.call("ZREM", KEYS[1], msg) +local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) +for _, id in ipairs(ids) do + redis.call("LPUSH", KEYS[2], id) + redis.call("ZREM", KEYS[1], id) end -return table.getn(msgs)`) +return table.getn(ids)`) // forward moves tasks with a score less than the current unix time // from the src zset to the dst list. It returns the number of tasks moved. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index a24aebf..d82bc95 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1221,7 +1221,7 @@ func TestArchive(t *testing.T) { } } -func TestCheckAndEnqueue(t *testing.T) { +func TestForwardIfReady(t *testing.T) { r := setup(t) defer r.Close() t1 := h.NewTaskMessage("send_email", nil) @@ -1338,7 +1338,7 @@ func TestCheckAndEnqueue(t *testing.T) { h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) - err := r.CheckAndEnqueue(tc.qnames...) + err := r.ForwardIfReady(tc.qnames...) if err != nil { t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err) continue diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 3696401..6ab74d3 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -126,13 +126,13 @@ func (tb *TestBroker) Archive(msg *base.TaskMessage, errMsg string) error { return tb.real.Archive(msg, errMsg) } -func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { +func (tb *TestBroker) ForwardIfReady(qnames ...string) error { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return errRedisDown } - return tb.real.CheckAndEnqueue(qnames...) + return tb.real.ForwardIfReady(qnames...) } func (tb *TestBroker) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {