mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 08:12:19 +08:00
Refactor redis keys and store messages in protobuf
Changes: - Task messages are stored under "asynq:{<qname>}:t:<task_id>" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout". - Redis LIST and ZSET stores task message IDs - Task messages are serialized using protocol buffer
This commit is contained in:
parent
0bf767cf21
commit
b9c2572203
@ -416,7 +416,7 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.E(op, errors.CanonicalCode(err), err)
|
return nil, errors.E(op, errors.CanonicalCode(err), err)
|
||||||
}
|
}
|
||||||
return res, nil
|
return zs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
|
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
|
||||||
@ -634,7 +634,7 @@ func (r *RDB) runAll(zset, qname string) (int64, error) {
|
|||||||
}
|
}
|
||||||
res, err := runAllCmd.Run(r.client, keys, argv...).Result()
|
res, err := runAllCmd.Run(r.client, keys, argv...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return err
|
||||||
}
|
}
|
||||||
n, ok := res.(int64)
|
n, ok := res.(int64)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -1147,6 +1147,21 @@ end
|
|||||||
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
|
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
|
||||||
redis.call("DEL", ARGV[1] .. id)
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
end
|
end
|
||||||
|
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
|
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
|
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
|
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
|
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
redis.call("DEL", KEYS[1])
|
redis.call("DEL", KEYS[1])
|
||||||
redis.call("DEL", KEYS[2])
|
redis.call("DEL", KEYS[2])
|
||||||
redis.call("DEL", KEYS[3])
|
redis.call("DEL", KEYS[3])
|
||||||
@ -1195,6 +1210,9 @@ end
|
|||||||
for _, id in ipairs(ids) do
|
for _, id in ipairs(ids) do
|
||||||
redis.call("DEL", ARGV[1] .. id)
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
end
|
end
|
||||||
|
for _, id in ipairs(ids) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
redis.call("DEL", KEYS[1])
|
redis.call("DEL", KEYS[1])
|
||||||
redis.call("DEL", KEYS[2])
|
redis.call("DEL", KEYS[2])
|
||||||
redis.call("DEL", KEYS[3])
|
redis.call("DEL", KEYS[3])
|
||||||
|
@ -150,6 +150,17 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
tc.msg, tc.ttl, err)
|
tc.msg, tc.ttl, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
|
||||||
|
if len(gotPending) != 1 {
|
||||||
|
t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
|
||||||
|
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
|
||||||
|
}
|
||||||
|
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
|
||||||
|
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
|
||||||
|
}
|
||||||
|
|
||||||
// Check Pending list has task ID.
|
// Check Pending list has task ID.
|
||||||
pendingKey := base.PendingKey(tc.msg.Queue)
|
pendingKey := base.PendingKey(tc.msg.Queue)
|
||||||
|
Loading…
Reference in New Issue
Block a user