2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Update Dequeue command in rdb

This commit is contained in:
Ken Hibino 2020-08-08 06:04:16 -07:00
parent 94aa878060
commit 565f86ee4f
2 changed files with 79 additions and 54 deletions

View File

@ -127,9 +127,6 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
return msg, time.Unix(d, 0), nil return msg, time.Unix(d, 0), nil
} }
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:paused
// KEYS[3] -> asynq:deadlines
// ARGV[1] -> current time in Unix time // ARGV[1] -> current time in Unix time
// ARGV[2:] -> List of queues to query in order // ARGV[2:] -> List of queues to query in order
// //
@ -140,8 +137,11 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
var dequeueCmd = redis.NewScript(` var dequeueCmd = redis.NewScript(`
for i = 2, table.getn(ARGV) do for i = 2, table.getn(ARGV) do
local qkey = ARGV[i] local qkey = ARGV[i]
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then local key_paused = qkey .. ":paused"
local msg = redis.call("RPOPLPUSH", qkey, KEYS[1]) local key_inprogress = qkey .. ":in_progress"
local key_deadlines = qkey .. ":deadlines"
if redis.call("EXISTS", key_paused) == 0 then
local msg = redis.call("RPOPLPUSH", qkey, key_inprogress)
if msg then if msg then
local decoded = cjson.decode(msg) local decoded = cjson.decode(msg)
local timeout = decoded["Timeout"] local timeout = decoded["Timeout"]
@ -156,7 +156,7 @@ for i = 2, table.getn(ARGV) do
else else
return redis.error_reply("asynq internal error: both timeout and deadline are not set") return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end end
redis.call("ZADD", KEYS[3], score, msg) redis.call("ZADD", key_deadlines, score, msg)
return {msg, score} return {msg, score}
end end
end end
@ -167,8 +167,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err
var args []interface{} var args []interface{}
args = append(args, time.Now().Unix()) args = append(args, time.Now().Unix())
args = append(args, qkeys...) args = append(args, qkeys...)
res, err := dequeueCmd.Run(r.client, res, err := dequeueCmd.Run(r.client, nil, args...).Result()
[]string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result()
if err != nil { if err != nil {
return "", 0, err return "", 0, err
} }

View File

@ -146,8 +146,8 @@ func TestDequeue(t *testing.T) {
wantDeadline time.Time wantDeadline time.Time
err error err error
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress map[string][]*base.TaskMessage
wantDeadlines []base.Z wantDeadlines map[string][]base.Z
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@ -160,12 +160,11 @@ func TestDequeue(t *testing.T) {
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
wantInProgress: []*base.TaskMessage{t1}, wantInProgress: map[string][]*base.TaskMessage{
wantDeadlines: []base.Z{ "default": {t1},
{ },
Message: t1, wantDeadlines: map[string][]base.Z{
Score: t1Deadline, "default": {{Message: t1, Score: t1Deadline}},
},
}, },
}, },
{ {
@ -179,8 +178,12 @@ func TestDequeue(t *testing.T) {
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: map[string][]*base.TaskMessage{
wantDeadlines: []base.Z{}, "default": {},
},
wantDeadlines: map[string][]base.Z{
"default": {},
},
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@ -197,12 +200,15 @@ func TestDequeue(t *testing.T) {
"critical": {}, "critical": {},
"low": {t3}, "low": {t3},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: map[string][]*base.TaskMessage{
wantDeadlines: []base.Z{ "default": {},
{ "critical": {t2},
Message: t2, "low": {},
Score: t2Deadline, },
}, wantDeadlines: map[string][]base.Z{
"default": {},
"critical": {{Message: t2, Score: t2Deadline}},
"low": {},
}, },
}, },
{ {
@ -220,12 +226,15 @@ func TestDequeue(t *testing.T) {
"critical": {}, "critical": {},
"low": {t2, t1}, "low": {t2, t1},
}, },
wantInProgress: []*base.TaskMessage{t3}, wantInProgress: map[string][]*base.TaskMessage{
wantDeadlines: []base.Z{ "default": {t3},
{ "critical": {},
Message: t3, "low": {},
Score: t3Deadline, },
}, wantDeadlines: map[string][]base.Z{
"default": {{Message: t3, Score: t3Deadline}},
"critical": {},
"low": {},
}, },
}, },
{ {
@ -243,16 +252,22 @@ func TestDequeue(t *testing.T) {
"critical": {}, "critical": {},
"low": {}, "low": {},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: map[string][]*base.TaskMessage{
wantDeadlines: []base.Z{}, "default": {},
"critical": {},
"low": {},
},
wantDeadlines: map[string][]base.Z{
"default": {},
"critical": {},
"low": {},
},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
for queue, msgs := range tc.enqueued { h.SeedAllEnqueuedQueues(t, r.client, msgs, queue, tc.enqueued)
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
}
gotMsg, gotDeadline, err := r.Dequeue(tc.args...) gotMsg, gotDeadline, err := r.Dequeue(tc.args...)
if err != tc.err { if err != tc.err {
@ -277,21 +292,25 @@ func TestDequeue(t *testing.T) {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
} }
} }
gotInProgress := h.GetInProgressMessages(t, r.client) for queue, want := range tc.wantInProgress {
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { gotInProgress := h.GetInProgressMessages(t, r.client, queue)
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff)
}
} }
gotDeadlines := h.GetDeadlinesEntries(t, r.client) for queue, want := range tc.wantDeadlines {
if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.KeyDeadlines, diff) if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff)
}
} }
} }
} }
func TestDequeueIgnoresPausedQueues(t *testing.T) { func TestDequeueIgnoresPausedQueues(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) t1 := h.NewTaskMessageWithQueue("send_email", map[string]interface{}{"subject": "hello!"}, "default")
t2 := h.NewTaskMessage("export_csv", nil) t2 := h.NewTaskMessageWithQueue("export_csv", nil, "critical")
tests := []struct { tests := []struct {
paused []string // list of paused queues paused []string // list of paused queues
@ -300,7 +319,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
wantMsg *base.TaskMessage wantMsg *base.TaskMessage
err error err error
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress map[string][]*base.TaskMessage
}{ }{
{ {
paused: []string{"default"}, paused: []string{"default"},
@ -315,7 +334,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
"default": {t1}, "default": {t1},
"critical": {}, "critical": {},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: map[string][]*base.TaskMessage{
"default": {},
"critical": {t2},
},
}, },
{ {
paused: []string{"default"}, paused: []string{"default"},
@ -328,7 +350,9 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: map[string][]*base.TaskMessage{
"default": {},
},
}, },
{ {
paused: []string{"critical", "default"}, paused: []string{"critical", "default"},
@ -343,7 +367,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
"default": {t1}, "default": {t1},
"critical": {t2}, "critical": {t2},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
}, },
} }
@ -354,9 +381,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
for queue, msgs := range tc.enqueued { h.SeedAllEnqueuedQueues(t, r.client, msgs, queue, tc.enqueued)
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
}
got, _, err := r.Dequeue(tc.args...) got, _, err := r.Dequeue(tc.args...)
if !cmp.Equal(got, tc.wantMsg) || err != tc.err { if !cmp.Equal(got, tc.wantMsg) || err != tc.err {
@ -371,10 +396,11 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
} }
} }
for queue, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client) gotInProgress := h.GetInProgressMessages(t, r.client, queue)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff)
}
} }
} }
} }