From b9c2572203dbb8252c2a6724b539546f55e3df4b Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 12 Mar 2021 16:23:08 -0800 Subject: [PATCH] Refactor redis keys and store messages in protobuf Changes: - Task messages are stored under "asynq:{}:t:" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout". - Redis LIST and ZSET stores task message IDs - Task messages are serialized using protocol buffer --- internal/rdb/inspect.go | 22 ++++++++++++++++++++-- internal/rdb/rdb_test.go | 11 +++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 6430f1a..7b1a25b 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -416,7 +416,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { if err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } - return res, nil + return zs, nil } // KEYS[1] -> key for ids set (e.g. asynq:{}:scheduled) @@ -634,7 +634,7 @@ func (r *RDB) runAll(zset, qname string) (int64, error) { } res, err := runAllCmd.Run(r.client, keys, argv...).Result() if err != nil { - return 0, err + return err } n, ok := res.(int64) if !ok { @@ -1147,6 +1147,21 @@ end for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do redis.call("DEL", ARGV[1] .. id) end +for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end +for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do + redis.call("DEL", ARGV[1] .. id) +end redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) redis.call("DEL", KEYS[3]) @@ -1195,6 +1210,9 @@ end for _, id in ipairs(ids) do redis.call("DEL", ARGV[1] .. id) end +for _, id in ipairs(ids) do + redis.call("DEL", ARGV[1] .. id) +end redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) redis.call("DEL", KEYS[3]) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index e711ae3..95d9e39 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -150,6 +150,17 @@ func TestEnqueueUnique(t *testing.T) { tc.msg, tc.ttl, err) continue } + gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue) + if len(gotPending) != 1 { + t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending)) + continue + } + if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" { + t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) + } + if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { + t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) + } // Check Pending list has task ID. pendingKey := base.PendingKey(tc.msg.Queue)