2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-22 22:06:12 +08:00

Refactor redis keys and store messages in protobuf

Changes:
- Task messages are stored under "asynq:{<qname>}:t:<task_id>" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout".
- Redis LIST and ZSET stores task message IDs
- Task messages are serialized using protocol buffer
This commit is contained in:
Ken Hibino
2021-03-12 16:23:08 -08:00
parent 63ce9ed0f9
commit a0b52806c9
22 changed files with 2557 additions and 610 deletions

View File

@@ -518,8 +518,8 @@ func TestInspectorListPendingTasks(t *testing.T) {
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessage("task4", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "critical")
m4 := h.NewTaskMessageWithQueue("task4", nil, "low")
inspector := New(getRedisConnOpt(t))
@@ -587,8 +587,8 @@ func TestInspectorListActiveTasks(t *testing.T) {
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessage("task4", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
inspector := New(getRedisConnOpt(t))
@@ -1254,13 +1254,13 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) {
},
{
pending: map[string][]*base.TaskMessage{
"default": {m3, m4},
"default": {m3},
},
archived: map[string][]base.Z{
"default": {z1, z2},
},
qname: "default",
want: 2,
want: 1,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
@@ -1269,7 +1269,6 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) {
z1,
z2,
base.Z{Message: m3, Score: now.Unix()},
base.Z{Message: m4, Score: now.Unix()},
},
},
},