From 7c75abe334aaf1557f97b17ac56b2f441343875f Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 24 Feb 2021 11:22:05 -0800 Subject: [PATCH] Update RDB.Dequeue --- .gitignore | 5 ++- CHANGELOG.md | 5 +++ README.md | 2 +- internal/asynqtest/asynqtest.go | 34 +++++++++++------ internal/rdb/rdb.go | 67 ++++++++++++++++++++++----------- internal/rdb/rdb_test.go | 18 +++++---- 6 files changed, 89 insertions(+), 42 deletions(-) diff --git a/.gitignore b/.gitignore index 7c03ff8..63a7360 100644 --- a/.gitignore +++ b/.gitignore @@ -18,4 +18,7 @@ /tools/asynq/asynq # Ignore asynq config file -.asynq.* \ No newline at end of file +.asynq.* + +# Ignore editor config files +.vscode \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ca1360..b81aab0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Requires redis v4.0+ for multiple field/value pair support +- Renamed pending key (TODO: need migration script) + ## [0.16.0] - 2021-03-10 ### Added diff --git a/README.md b/README.md index f91f95d..5286791 100644 --- a/README.md +++ b/README.md @@ -293,7 +293,7 @@ go get -u github.com/hibiken/asynq/tools/asynq | Dependency | Version | | -------------------------- | ------- | -| [Redis](https://redis.io/) | v3.0+ | +| [Redis](https://redis.io/) | v4.0+ | | [Go](https://golang.org/) | v1.13+ | ## Contributing diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 23bef4b..8ee0193 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -284,7 +284,13 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, msgs []*b if err := c.LPush(key, msg.ID.String()).Err(); err != nil { tb.Fatal(err) } - if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil { + key := base.TaskKey(msg.Queue, msg.ID.String()) + data := map[string]interface{}{ + "msg": encoded, + "timeout": msg.Timeout, + "deadline": msg.Deadline, + } + if err := c.HSet(key, data).Err(); err != nil { tb.Fatal(err) } } @@ -298,7 +304,13 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b if err := c.ZAdd(key, z).Err(); err != nil { tb.Fatal(err) } - if err := c.Set(base.TaskKey(msg.Queue, msg.ID.String()), encoded, 0).Err(); err != nil { + key := base.TaskKey(msg.Queue, msg.ID.String()) + data := map[string]interface{}{ + "msg": encoded, + "timeout": msg.Timeout, + "deadline": msg.Deadline, + } + if err := c.HSet(key, data).Err(); err != nil { tb.Fatal(err) } } @@ -310,7 +322,7 @@ func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) [] ids := r.LRange(base.PendingKey(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { - data := r.Get(base.TaskKey(qname, id)).Val() + data := r.HGet(base.TaskKey(qname, id), "msg").Val() msgs = append(msgs, MustUnmarshal(tb, data)) } return msgs @@ -322,7 +334,7 @@ func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []* ids := r.LRange(base.ActiveKey(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { - data := r.Get(base.TaskKey(qname, id)).Val() + data := r.HGet(base.TaskKey(qname, id), "msg").Val() msgs = append(msgs, MustUnmarshal(tb, data)) } return msgs @@ -334,7 +346,7 @@ func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) ids := r.ZRange(base.ScheduledKey(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { - msg := r.Get(base.TaskKey(qname, id)).Val() + msg := r.HGet(base.TaskKey(qname, id), "msg").Val() msgs = append(msgs, MustUnmarshal(tb, msg)) } return msgs @@ -346,7 +358,7 @@ func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*b ids := r.ZRange(base.RetryKey(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { - msg := r.Get(base.TaskKey(qname, id)).Val() + msg := r.HGet(base.TaskKey(qname, id), "msg").Val() msgs = append(msgs, MustUnmarshal(tb, msg)) } return msgs @@ -358,7 +370,7 @@ func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) [ ids := r.ZRange(base.ArchivedKey(qname), 0, -1).Val() var msgs []*base.TaskMessage for _, id := range ids { - msg := r.Get(base.TaskKey(qname, id)).Val() + msg := r.HGet(base.TaskKey(qname, id), "msg").Val() msgs = append(msgs, MustUnmarshal(tb, msg)) } return msgs @@ -370,7 +382,7 @@ func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) [ zs := r.ZRangeWithScores(base.ScheduledKey(qname), 0, -1).Val() var res []base.Z for _, z := range zs { - msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)}) } return res @@ -382,7 +394,7 @@ func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []bas zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val() var res []base.Z for _, z := range zs { - msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() res = append(res, base.Z{ Message: MustUnmarshal(tb, msg), Score: int64(z.Score), @@ -397,7 +409,7 @@ func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) [] zs := r.ZRangeWithScores(base.ArchivedKey(qname), 0, -1).Val() var res []base.Z for _, z := range zs { - msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() res = append(res, base.Z{ Message: MustUnmarshal(tb, msg), Score: int64(z.Score), @@ -412,7 +424,7 @@ func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) [ zs := r.ZRangeWithScores(base.DeadlinesKey(qname), 0, -1).Val() var res []base.Z for _, z := range zs { - msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + msg := r.HGet(base.TaskKey(qname, z.Member.(string)), "msg").Val() res = append(res, base.Z{ Message: MustUnmarshal(tb, msg), Score: int64(z.Score), diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c9e8ccb..0e25b77 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -53,8 +53,10 @@ func (r *RDB) Ping() error { // KEYS[2] -> asynq:{}:pending // ARGV[1] -> task message data // ARGV[2] -> task ID +// ARGV[3] -> task timeout in seconds (0 if not timeout) +// ARGV[4] -> task deadline in unix time (0 if no deadline) var enqueueCmd = redis.NewScript(` -redis.call("SET", KEYS[1], ARGV[1]) +redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[3], "deadline", ARGV[4]) redis.call("LPUSH", KEYS[2], ARGV[2]) return 1 `) @@ -75,6 +77,8 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { argv := []interface{}{ encoded, msg.ID.String(), + msg.Timeout, + msg.Deadline, } return enqueueCmd.Run(r.client, keys, argv...).Err() } @@ -85,12 +89,14 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> task message data +// ARGV[4] -> task timeout in seconds (0 if not timeout) +// ARGV[5] -> task deadline in unix time (0 if no deadline) var enqueueUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("SET", KEYS[2], ARGV[3]) +redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5]) redis.call("LPUSH", KEYS[3], ARGV[1]) return 1 `) @@ -114,6 +120,8 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { msg.ID.String(), int(ttl.Seconds()), encoded, + msg.Timeout, + msg.Deadline, } res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -134,21 +142,22 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { // Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { - data, d, err := r.dequeue(qnames...) + encoded, d, err := r.dequeue(qnames...) if err != nil { return nil, time.Time{}, err } - if msg, err = base.DecodeMessage(data); err != nil { + if msg, err = base.DecodeMessage(encoded); err != nil { return nil, time.Time{}, err } return msg, time.Unix(d, 0), nil } -// KEYS[1] -> asynq:{} +// KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:paused // KEYS[3] -> asynq:{}:active // KEYS[4] -> asynq:{}:deadlines -// ARGV[1] -> current time in Unix time +// ARGV[1] -> current time in Unix time +// ARGV[2] -> task key prefix // // dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. @@ -156,11 +165,13 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti // and inserts the task with deadlines set. var dequeueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[2]) == 0 then - local msg = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) - if msg then - local decoded = cjson.decode(msg) - local timeout = decoded["Timeout"] - local deadline = decoded["Deadline"] + local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) + if id then + local key = ARGV[2] .. id + local data = redis.call("HMGET", key, "msg", "timeout", "deadline") + local msg = data[1] + local timeout = tonumber(data[2]) + local deadline = tonumber(data[3]) local score if timeout ~= 0 and deadline ~= 0 then score = math.min(ARGV[1]+timeout, deadline) @@ -171,13 +182,13 @@ if redis.call("EXISTS", KEYS[2]) == 0 then else return redis.error_reply("asynq internal error: both timeout and deadline are not set") end - redis.call("ZADD", KEYS[4], score, msg) + redis.call("ZADD", KEYS[4], score, id) return {msg, score} end end return nil`) -func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) { +func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err error) { for _, qname := range qnames { keys := []string{ base.PendingKey(qname), @@ -185,7 +196,11 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err base.ActiveKey(qname), base.DeadlinesKey(qname), } - res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result() + argv := []interface{}{ + time.Now().Unix(), + base.TaskKeyPrefix(qname), + } + res, err := dequeueCmd.Run(r.client, keys, argv...).Result() if err == redis.Nil { continue } else if err != nil { @@ -198,13 +213,14 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err if len(data) != 2 { return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) } - if msgjson, err = cast.ToStringE(data[0]); err != nil { + if encoded, err = cast.ToStringE(data[0]); err != nil { return "", 0, err } if deadline, err = cast.ToInt64E(data[1]); err != nil { return "", 0, err } - return msgjson, deadline, nil + fmt.Printf("debug: Returning msgjson=%s deadline=%d\n", encoded, deadline) + return encoded, deadline, nil } return "", 0, ErrNoProcessableTask } @@ -308,8 +324,10 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { // ARGV[1] -> task message data // ARGV[2] -> process_at time in Unix time // ARGV[3] -> task ID +// ARGV[4] -> task timeout in seconds (0 if not timeout) +// ARGV[5] -> task deadline in unix time (0 if no deadline) var scheduleCmd = redis.NewScript(` -redis.call("SET", KEYS[1], ARGV[1]) +redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[4], "deadline", ARGV[5]) redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) return 1 `) @@ -331,6 +349,8 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { encoded, processAt.Unix(), msg.ID.String(), + msg.Timeout, + msg.Deadline, } return scheduleCmd.Run(r.client, keys, argv...).Err() } @@ -342,12 +362,14 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> score (process_at timestamp) // ARGV[4] -> task message +// ARGV[5] -> task timeout in seconds (0 if not timeout) +// ARGV[6] -> task deadline in unix time (0 if no deadline) var scheduleUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("SET", KEYS[2], ARGV[4]) +redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6]) redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) return 1 `) @@ -372,6 +394,8 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim int(ttl.Seconds()), processAt.Unix(), encoded, + msg.Timeout, + msg.Deadline, } res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -405,7 +429,7 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) -redis.call("SET", KEYS[1], ARGV[2]) +redis.call("HSET", KEYS[1], "msg", ARGV[2]) local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[4]) @@ -472,7 +496,7 @@ end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) -redis.call("SET", KEYS[1], ARGV[2]) +redis.call("HSET", KEYS[1], "msg", ARGV[2]) local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[6]) @@ -571,7 +595,8 @@ var listDeadlineExceededCmd = redis.NewScript(` local res = {} local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, id in ipairs(ids) do - table.insert(res, redis.call("GET", ARGV[2] .. id)) + local key = ARGV[2] .. id + table.insert(res, redis.call("HGET", key, "msg")) end return res `) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 5f7e682..780cc56 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -158,6 +158,7 @@ func TestDequeue(t *testing.T) { ID: uuid.New(), Type: "send_email", Payload: map[string]interface{}{"subject": "hello!"}, + Queue: "default", Timeout: 1800, Deadline: 0, } @@ -166,6 +167,7 @@ func TestDequeue(t *testing.T) { ID: uuid.New(), Type: "export_csv", Payload: nil, + Queue: "critical", Timeout: 0, Deadline: 1593021600, } @@ -174,10 +176,10 @@ func TestDequeue(t *testing.T) { ID: uuid.New(), Type: "reindex", Payload: nil, + Queue: "low", Timeout: int64((5 * time.Minute).Seconds()), Deadline: time.Now().Add(10 * time.Minute).Unix(), } - t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest tests := []struct { pending map[string][]*base.TaskMessage @@ -253,26 +255,26 @@ func TestDequeue(t *testing.T) { }, { pending: map[string][]*base.TaskMessage{ - "default": {t3}, + "default": {t1}, "critical": {}, - "low": {t2, t1}, + "low": {t3}, }, args: []string{"critical", "default", "low"}, - wantMsg: t3, - wantDeadline: time.Unix(t3Deadline, 0), + wantMsg: t1, + wantDeadline: time.Unix(t1Deadline, 0), err: nil, wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, - "low": {t2, t1}, + "low": {t3}, }, wantActive: map[string][]*base.TaskMessage{ - "default": {t3}, + "default": {t1}, "critical": {}, "low": {}, }, wantDeadlines: map[string][]base.Z{ - "default": {{Message: t3, Score: t3Deadline}}, + "default": {{Message: t1, Score: t1Deadline}}, "critical": {}, "low": {}, },