From bee784c052d871ec50a1c2a5b7dbb6003b2c7f63 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Thu, 18 Jun 2020 07:10:57 -0700 Subject: [PATCH] Update RDB.Done to remove message from deadlines set --- internal/asynqtest/asynqtest.go | 6 +++ internal/rdb/rdb.go | 21 +++++---- internal/rdb/rdb_test.go | 83 +++++++++++++++++++++++++++++++-- 3 files changed, 97 insertions(+), 13 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index a3049f2..0f18140 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -187,6 +187,12 @@ func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) { seedRedisZSet(tb, r, base.DeadQueue, entries) } +// SeedDeadlines initializes the deadlines set with the given entries. +func SeedDeadlines(tb testing.TB, r *redis.Client, entries []ZSetEntry) { + tb.Helper() + seedRedisZSet(tb, r, base.KeyDeadlines, entries) +} + func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.TaskMessage) { data := MustMarshalSlice(tb, msgs) for _, s := range data { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 0d0a7bb..08af65f 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -186,8 +186,9 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err e } // KEYS[1] -> asynq:in_progress -// KEYS[2] -> asynq:processed: -// KEYS[3] -> unique key in the format :: +// KEYS[2] -> asynq:deadlines +// KEYS[3] -> asynq:processed: +// KEYS[4] -> unique key in the format :: // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp // ARGV[3] -> task ID @@ -197,12 +198,16 @@ local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) if x == 0 then return redis.error_reply("NOT FOUND") end -local n = redis.call("INCR", KEYS[2]) -if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[2], ARGV[2]) +x = redis.call("ZREM", KEYS[2], ARGV[1]) +if x == 0 then + return redis.error_reply("NOT FOUND") end -if string.len(KEYS[3]) > 0 and redis.call("GET", KEYS[3]) == ARGV[3] then - redis.call("DEL", KEYS[3]) +local n = redis.call("INCR", KEYS[3]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[2]) +end +if string.len(KEYS[4]) > 0 and redis.call("GET", KEYS[4]) == ARGV[3] then + redis.call("DEL", KEYS[4]) end return redis.status_reply("OK") `) @@ -218,7 +223,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { processedKey := base.ProcessedKey(now) expireAt := now.Add(statsTTL) return doneCmd.Run(r.client, - []string{base.InProgressQueue, processedKey, msg.UniqueKey}, + []string{base.InProgressQueue, base.KeyDeadlines, processedKey, msg.UniqueKey}, encoded, expireAt.Unix(), msg.ID.String()).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7c2d1ed..309918d 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -384,40 +384,108 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { func TestDone(t *testing.T) { r := setup(t) - t1 := h.NewTaskMessage("send_email", nil) - t2 := h.NewTaskMessage("export_csv", nil) + now := time.Now() + t1 := &base.TaskMessage{ + ID: xid.New(), + Type: "send_email", + Payload: nil, + Timeout: 1800, + Deadline: 0, + } + t1Deadline := int(now.Unix()) + t1.Timeout + t2 := &base.TaskMessage{ + ID: xid.New(), + Type: "export_csv", + Payload: nil, + Timeout: 0, + Deadline: 1592485787, + } + t2Deadline := t2.Deadline t3 := &base.TaskMessage{ ID: xid.New(), Type: "reindex", Payload: nil, + Timeout: 1800, + Deadline: 0, UniqueKey: "reindex:nil:default", Queue: "default", } + t3Deadline := int(now.Unix()) + t3.Deadline tests := []struct { inProgress []*base.TaskMessage // initial state of the in-progress list + deadlines []h.ZSetEntry // initial state of deadlines set target *base.TaskMessage // task to remove wantInProgress []*base.TaskMessage // final state of the in-progress list + wantDeadlines []h.ZSetEntry // final state of the deadline set }{ { - inProgress: []*base.TaskMessage{t1, t2}, + inProgress: []*base.TaskMessage{t1, t2}, + deadlines: []h.ZSetEntry{ + { + Msg: t1, + Score: float64(t1Deadline), + }, + { + Msg: t2, + Score: float64(t2Deadline), + }, + }, target: t1, wantInProgress: []*base.TaskMessage{t2}, + wantDeadlines: []h.ZSetEntry{ + { + Msg: t2, + Score: float64(t2Deadline), + }, + }, }, { - inProgress: []*base.TaskMessage{t1}, + inProgress: []*base.TaskMessage{t1}, + deadlines: []h.ZSetEntry{ + { + Msg: t1, + Score: float64(t1Deadline), + }, + }, target: t1, wantInProgress: []*base.TaskMessage{}, + wantDeadlines: []h.ZSetEntry{}, }, { - inProgress: []*base.TaskMessage{t1, t2, t3}, + inProgress: []*base.TaskMessage{t1, t2, t3}, + deadlines: []h.ZSetEntry{ + { + Msg: t1, + Score: float64(t1Deadline), + }, + { + Msg: t2, + Score: float64(t2Deadline), + }, + { + Msg: t3, + Score: float64(t3Deadline), + }, + }, target: t3, wantInProgress: []*base.TaskMessage{t1, t2}, + wantDeadlines: []h.ZSetEntry{ + { + Msg: t1, + Score: float64(t1Deadline), + }, + { + Msg: t2, + Score: float64(t2Deadline), + }, + }, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case + h.SeedDeadlines(t, r.client, tc.deadlines) h.SeedInProgressQueue(t, r.client, tc.inProgress) for _, msg := range tc.inProgress { // Set uniqueness lock if unique key is present. @@ -440,6 +508,11 @@ func TestDone(t *testing.T) { t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) continue } + gotDeadlines := h.GetDeadlinesEntries(t, r.client) + if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.KeyDeadlines, diff) + continue + } processedKey := base.ProcessedKey(time.Now()) gotProcessed := r.client.Get(processedKey).Val()