2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

Update RDB.Done

This commit is contained in:
Ken Hibino 2021-02-22 07:14:12 -08:00
parent 26caccbefd
commit a745b2378a
2 changed files with 33 additions and 27 deletions

View File

@ -212,8 +212,9 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
// KEYS[1] -> asynq:{<qname>}:active // KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines // KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[3] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> base.TaskMessage value // KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
var doneCmd = redis.NewScript(` var doneCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
@ -222,20 +223,23 @@ end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
local n = redis.call("INCR", KEYS[3]) if redis.call("DEL", KEYS[3]) == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[4])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2]) redis.call("EXPIREAT", KEYS[4], ARGV[2])
end end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
// KEYS[1] -> asynq:{<qname>}:active // KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines // KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd> // KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> unique key // KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value // KEYS[5] -> unique key
// ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
var doneUniqueCmd = redis.NewScript(` var doneUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
@ -243,12 +247,15 @@ end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
local n = redis.call("INCR", KEYS[3]) if redis.call("DEL", KEYS[3]) == 0 then
if tonumber(n) == 1 then return redis.error_reply("NOT FOUND")
redis.call("EXPIREAT", KEYS[3], ARGV[2])
end end
if redis.call("GET", KEYS[4]) == ARGV[3] then local n = redis.call("INCR", KEYS[4])
redis.call("DEL", KEYS[4]) if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[2])
end
if redis.call("GET", KEYS[5]) == ARGV[1] then
redis.call("DEL", KEYS[5])
end end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
@ -256,24 +263,23 @@ return redis.status_reply("OK")
// Done removes the task from active queue to mark the task as done. // Done removes the task from active queue to mark the task as done.
// It removes a uniqueness lock acquired by the task, if any. // It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) Done(msg *base.TaskMessage) error { func (r *RDB) Done(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
now := time.Now() now := time.Now()
expireAt := now.Add(statsTTL) expireAt := now.Add(statsTTL)
keys := []string{ keys := []string{
base.ActiveKey(msg.Queue), base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue), base.DeadlinesKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID.String()),
base.ProcessedKey(msg.Queue, now), base.ProcessedKey(msg.Queue, now),
} }
args := []interface{}{encoded, expireAt.Unix()} argv := []interface{}{
msg.ID.String(),
expireAt.Unix(),
}
if len(msg.UniqueKey) > 0 { if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey) keys = append(keys, msg.UniqueKey)
args = append(args, msg.ID.String()) return doneUniqueCmd.Run(r.client, keys, argv...).Err()
return doneUniqueCmd.Run(r.client, keys, args...).Err()
} }
return doneCmd.Run(r.client, keys, args...).Err() return doneCmd.Run(r.client, keys, argv...).Err()
} }
// KEYS[1] -> asynq:{<qname>}:active // KEYS[1] -> asynq:{<qname>}:active

View File

@ -495,7 +495,7 @@ func TestDone(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
inProgress map[string][]*base.TaskMessage // initial state of the active list active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of deadlines set deadlines map[string][]base.Z // initial state of deadlines set
target *base.TaskMessage // task to remove target *base.TaskMessage // task to remove
wantActive map[string][]*base.TaskMessage // final state of the active list wantActive map[string][]*base.TaskMessage // final state of the active list
@ -503,7 +503,7 @@ func TestDone(t *testing.T) {
}{ }{
{ {
desc: "removes message from the correct queue", desc: "removes message from the correct queue",
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"custom": {t2}, "custom": {t2},
}, },
@ -523,7 +523,7 @@ func TestDone(t *testing.T) {
}, },
{ {
desc: "with one queue", desc: "with one queue",
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
}, },
deadlines: map[string][]base.Z{ deadlines: map[string][]base.Z{
@ -539,7 +539,7 @@ func TestDone(t *testing.T) {
}, },
{ {
desc: "with multiple messages in a queue", desc: "with multiple messages in a queue",
inProgress: map[string][]*base.TaskMessage{ active: map[string][]*base.TaskMessage{
"default": {t1, t3}, "default": {t1, t3},
"custom": {t2}, "custom": {t2},
}, },
@ -562,8 +562,8 @@ func TestDone(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllActiveQueues(t, r.client, tc.active)
for _, msgs := range tc.inProgress { for _, msgs := range tc.active {
for _, msg := range msgs { for _, msg := range msgs {
// Set uniqueness lock if unique key is present. // Set uniqueness lock if unique key is present.
if len(msg.UniqueKey) > 0 { if len(msg.UniqueKey) > 0 {