From 6cc5bafabaafbd444eea704f39519d4efa11ba12 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 17 Jun 2020 06:46:54 -0700 Subject: [PATCH] Add task message to deadlines set on dequeue Updated dequeueCmd to decode the message and compute its deadline and add the message to the Deadline set. --- internal/asynqtest/asynqtest.go | 6 ++++ internal/rdb/rdb.go | 32 ++++++++++++++--- internal/rdb/rdb_test.go | 64 ++++++++++++++++++++++++++++----- 3 files changed, 89 insertions(+), 13 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index c59ff65..a3049f2 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -259,6 +259,12 @@ func GetDeadEntries(tb testing.TB, r *redis.Client) []ZSetEntry { return getZSetEntries(tb, r, base.DeadQueue) } +// GetDeadlinesEntries returns all task messages and its score in the deadlines set. +func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []ZSetEntry { + tb.Helper() + return getZSetEntries(tb, r, base.KeyDeadlines) +} + func getListMessages(tb testing.TB, r *redis.Client, list string) []*base.TaskMessage { data := r.LRange(list, 0, -1).Val() return MustUnmarshalSlice(tb, data) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index af71be9..9314358 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -120,17 +120,36 @@ func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { return base.DecodeMessage(data) } -// KEYS[1] -> asynq:in_progress -// KEYS[2] -> asynq:paused -// ARGV -> List of queues to query in order +// KEYS[1] -> asynq:in_progress +// KEYS[2] -> asynq:paused +// KEYS[3] -> asynq:deadlines +// ARGV[1] -> current time in Unix time +// ARGV[2:] -> List of queues to query in order // // dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. +// It computes the task deadline by inspecting Timout and Deadline fields, +// and inserts the task with deadlines set. var dequeueCmd = redis.NewScript(` -for _, qkey in ipairs(ARGV) do +for i = 2, table.getn(ARGV) do + local qkey = ARGV[i] if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then local res = redis.call("RPOPLPUSH", qkey, KEYS[1]) if res then + local decoded = cjson.decode(res) + local timeout = decoded["Timeout"] + local deadline = decoded["Deadline"] + local score + if timeout ~= 0 and deadline ~= 0 then + score = math.min(ARGV[1]+timeout, deadline) + elseif timeout ~= 0 then + score = ARGV[1] + timeout + elseif deadline ~= 0 then + score = deadline + else + return redis.error_reply("asynq internal error: both timeout and deadline are not set") + end + redis.call("ZADD", KEYS[3], score, res) return res end end @@ -138,8 +157,11 @@ end return nil`) func (r *RDB) dequeue(qkeys ...interface{}) (data string, err error) { + var args []interface{} + args = append(args, time.Now().Unix()) + args = append(args, qkeys...) res, err := dequeueCmd.Run(r.client, - []string{base.InProgressQueue, base.PausedQueues}, qkeys...).Result() + []string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result() if err != nil { return "", err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index ca3389d..3732feb 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -114,9 +114,31 @@ func TestEnqueueUnique(t *testing.T) { func TestDequeue(t *testing.T) { r := setup(t) - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) - t2 := h.NewTaskMessage("export_csv", nil) - t3 := h.NewTaskMessage("reindex", nil) + now := time.Now() + t1 := &base.TaskMessage{ + ID: xid.New(), + Type: "send_email", + Payload: map[string]interface{}{"subject": "hello!"}, + Timeout: 1800, + Deadline: 0, + } + t1Deadline := int(now.Unix()) + t1.Timeout + t2 := &base.TaskMessage{ + ID: xid.New(), + Type: "export_csv", + Payload: nil, + Timeout: 0, + Deadline: 1593021600, + } + t2Deadline := t2.Deadline + t3 := &base.TaskMessage{ + ID: xid.New(), + Type: "reindex", + Payload: nil, + Timeout: int((5 * time.Minute).Seconds()), + Deadline: int(time.Now().Add(10 * time.Minute).Unix()), + } + t3Deadline := int(now.Unix()) + t3.Timeout // use whichever is earliest tests := []struct { enqueued map[string][]*base.TaskMessage @@ -125,6 +147,7 @@ func TestDequeue(t *testing.T) { err error wantEnqueued map[string][]*base.TaskMessage wantInProgress []*base.TaskMessage + wantDeadlines []h.ZSetEntry }{ { enqueued: map[string][]*base.TaskMessage{ @@ -137,6 +160,12 @@ func TestDequeue(t *testing.T) { "default": {}, }, wantInProgress: []*base.TaskMessage{t1}, + wantDeadlines: []h.ZSetEntry{ + { + Msg: t1, + Score: float64(t1Deadline), + }, + }, }, { enqueued: map[string][]*base.TaskMessage{ @@ -149,6 +178,7 @@ func TestDequeue(t *testing.T) { "default": {}, }, wantInProgress: []*base.TaskMessage{}, + wantDeadlines: []h.ZSetEntry{}, }, { enqueued: map[string][]*base.TaskMessage{ @@ -165,22 +195,34 @@ func TestDequeue(t *testing.T) { "low": {t3}, }, wantInProgress: []*base.TaskMessage{t2}, + wantDeadlines: []h.ZSetEntry{ + { + Msg: t2, + Score: float64(t2Deadline), + }, + }, }, { enqueued: map[string][]*base.TaskMessage{ - "default": {t1}, + "default": {t3}, "critical": {}, - "low": {t2, t3}, + "low": {t2, t1}, }, args: []string{"critical", "default", "low"}, - want: t1, + want: t3, err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, - "low": {t2, t3}, + "low": {t2, t1}, + }, + wantInProgress: []*base.TaskMessage{t3}, + wantDeadlines: []h.ZSetEntry{ + { + Msg: t3, + Score: float64(t3Deadline), + }, }, - wantInProgress: []*base.TaskMessage{t1}, }, { enqueued: map[string][]*base.TaskMessage{ @@ -197,6 +239,7 @@ func TestDequeue(t *testing.T) { "low": {}, }, wantInProgress: []*base.TaskMessage{}, + wantDeadlines: []h.ZSetEntry{}, }, } @@ -224,6 +267,11 @@ func TestDequeue(t *testing.T) { if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) } + + gotDeadlines := h.GetDeadlinesEntries(t, r.client) + if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.KeyDeadlines, diff) + } } }