2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add task message to deadlines set on dequeue

Updated dequeueCmd to decode the message and compute its deadline and add
the message to the Deadline set.
This commit is contained in:
Ken Hibino 2020-06-17 06:46:54 -07:00
parent 716d3d987e
commit 6cc5bafaba
3 changed files with 89 additions and 13 deletions

View File

@ -259,6 +259,12 @@ func GetDeadEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
return getZSetEntries(tb, r, base.DeadQueue)
}
// GetDeadlinesEntries returns all task messages and its score in the deadlines set.
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper()
return getZSetEntries(tb, r, base.KeyDeadlines)
}
func getListMessages(tb testing.TB, r *redis.Client, list string) []*base.TaskMessage {
data := r.LRange(list, 0, -1).Val()
return MustUnmarshalSlice(tb, data)

View File

@ -120,17 +120,36 @@ func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
return base.DecodeMessage(data)
}
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:paused
// ARGV -> List of queues to query in order
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:paused
// KEYS[3] -> asynq:deadlines
// ARGV[1] -> current time in Unix time
// ARGV[2:] -> List of queues to query in order
//
// dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue.
// It computes the task deadline by inspecting Timout and Deadline fields,
// and inserts the task with deadlines set.
var dequeueCmd = redis.NewScript(`
for _, qkey in ipairs(ARGV) do
for i = 2, table.getn(ARGV) do
local qkey = ARGV[i]
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then
local res = redis.call("RPOPLPUSH", qkey, KEYS[1])
if res then
local decoded = cjson.decode(res)
local timeout = decoded["Timeout"]
local deadline = decoded["Deadline"]
local score
if timeout ~= 0 and deadline ~= 0 then
score = math.min(ARGV[1]+timeout, deadline)
elseif timeout ~= 0 then
score = ARGV[1] + timeout
elseif deadline ~= 0 then
score = deadline
else
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end
redis.call("ZADD", KEYS[3], score, res)
return res
end
end
@ -138,8 +157,11 @@ end
return nil`)
func (r *RDB) dequeue(qkeys ...interface{}) (data string, err error) {
var args []interface{}
args = append(args, time.Now().Unix())
args = append(args, qkeys...)
res, err := dequeueCmd.Run(r.client,
[]string{base.InProgressQueue, base.PausedQueues}, qkeys...).Result()
[]string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result()
if err != nil {
return "", err
}

View File

@ -114,9 +114,31 @@ func TestEnqueueUnique(t *testing.T) {
func TestDequeue(t *testing.T) {
r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessage("reindex", nil)
now := time.Now()
t1 := &base.TaskMessage{
ID: xid.New(),
Type: "send_email",
Payload: map[string]interface{}{"subject": "hello!"},
Timeout: 1800,
Deadline: 0,
}
t1Deadline := int(now.Unix()) + t1.Timeout
t2 := &base.TaskMessage{
ID: xid.New(),
Type: "export_csv",
Payload: nil,
Timeout: 0,
Deadline: 1593021600,
}
t2Deadline := t2.Deadline
t3 := &base.TaskMessage{
ID: xid.New(),
Type: "reindex",
Payload: nil,
Timeout: int((5 * time.Minute).Seconds()),
Deadline: int(time.Now().Add(10 * time.Minute).Unix()),
}
t3Deadline := int(now.Unix()) + t3.Timeout // use whichever is earliest
tests := []struct {
enqueued map[string][]*base.TaskMessage
@ -125,6 +147,7 @@ func TestDequeue(t *testing.T) {
err error
wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage
wantDeadlines []h.ZSetEntry
}{
{
enqueued: map[string][]*base.TaskMessage{
@ -137,6 +160,12 @@ func TestDequeue(t *testing.T) {
"default": {},
},
wantInProgress: []*base.TaskMessage{t1},
wantDeadlines: []h.ZSetEntry{
{
Msg: t1,
Score: float64(t1Deadline),
},
},
},
{
enqueued: map[string][]*base.TaskMessage{
@ -149,6 +178,7 @@ func TestDequeue(t *testing.T) {
"default": {},
},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{},
},
{
enqueued: map[string][]*base.TaskMessage{
@ -165,22 +195,34 @@ func TestDequeue(t *testing.T) {
"low": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{
{
Msg: t2,
Score: float64(t2Deadline),
},
},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {t1},
"default": {t3},
"critical": {},
"low": {t2, t3},
"low": {t2, t1},
},
args: []string{"critical", "default", "low"},
want: t1,
want: t3,
err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {t2, t3},
"low": {t2, t1},
},
wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []h.ZSetEntry{
{
Msg: t3,
Score: float64(t3Deadline),
},
},
wantInProgress: []*base.TaskMessage{t1},
},
{
enqueued: map[string][]*base.TaskMessage{
@ -197,6 +239,7 @@ func TestDequeue(t *testing.T) {
"low": {},
},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{},
},
}
@ -224,6 +267,11 @@ func TestDequeue(t *testing.T) {
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff)
}
gotDeadlines := h.GetDeadlinesEntries(t, r.client)
if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.KeyDeadlines, diff)
}
}
}