diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d4742e2..fc16681 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -212,8 +212,9 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err // KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines -// KEYS[3] -> asynq:{}:processed: -// ARGV[1] -> base.TaskMessage value +// KEYS[3] -> asynq:{}:t: +// KEYS[4] -> asynq:{}:processed: +// ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp var doneCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then @@ -222,20 +223,23 @@ end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end -local n = redis.call("INCR", KEYS[3]) +if redis.call("DEL", KEYS[3]) == 0 then + return redis.error_reply("NOT FOUND") +end +local n = redis.call("INCR", KEYS[4]) if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[3], ARGV[2]) + redis.call("EXPIREAT", KEYS[4], ARGV[2]) end return redis.status_reply("OK") `) // KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines -// KEYS[3] -> asynq:{}:processed: -// KEYS[4] -> unique key -// ARGV[1] -> base.TaskMessage value +// KEYS[3] -> asynq:{}:t: +// KEYS[4] -> asynq:{}:processed: +// KEYS[5] -> unique key +// ARGV[1] -> task ID // ARGV[2] -> stats expiration timestamp -// ARGV[3] -> task ID var doneUniqueCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") @@ -243,12 +247,15 @@ end if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end -local n = redis.call("INCR", KEYS[3]) -if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[3], ARGV[2]) +if redis.call("DEL", KEYS[3]) == 0 then + return redis.error_reply("NOT FOUND") end -if redis.call("GET", KEYS[4]) == ARGV[3] then - redis.call("DEL", KEYS[4]) +local n = redis.call("INCR", KEYS[4]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[4], ARGV[2]) +end +if redis.call("GET", KEYS[5]) == ARGV[1] then + redis.call("DEL", KEYS[5]) end return redis.status_reply("OK") `) @@ -256,24 +263,23 @@ return redis.status_reply("OK") // Done removes the task from active queue to mark the task as done. // It removes a uniqueness lock acquired by the task, if any. func (r *RDB) Done(msg *base.TaskMessage) error { - encoded, err := base.EncodeMessage(msg) - if err != nil { - return err - } now := time.Now() expireAt := now.Add(statsTTL) keys := []string{ base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), + base.TaskKey(msg.Queue, msg.ID.String()), base.ProcessedKey(msg.Queue, now), } - args := []interface{}{encoded, expireAt.Unix()} + argv := []interface{}{ + msg.ID.String(), + expireAt.Unix(), + } if len(msg.UniqueKey) > 0 { keys = append(keys, msg.UniqueKey) - args = append(args, msg.ID.String()) - return doneUniqueCmd.Run(r.client, keys, args...).Err() + return doneUniqueCmd.Run(r.client, keys, argv...).Err() } - return doneCmd.Run(r.client, keys, args...).Err() + return doneCmd.Run(r.client, keys, argv...).Err() } // KEYS[1] -> asynq:{}:active diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index d82bc95..8ff0db1 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -495,7 +495,7 @@ func TestDone(t *testing.T) { tests := []struct { desc string - inProgress map[string][]*base.TaskMessage // initial state of the active list + active map[string][]*base.TaskMessage // initial state of the active list deadlines map[string][]base.Z // initial state of deadlines set target *base.TaskMessage // task to remove wantActive map[string][]*base.TaskMessage // final state of the active list @@ -503,7 +503,7 @@ func TestDone(t *testing.T) { }{ { desc: "removes message from the correct queue", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1}, "custom": {t2}, }, @@ -523,7 +523,7 @@ func TestDone(t *testing.T) { }, { desc: "with one queue", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1}, }, deadlines: map[string][]base.Z{ @@ -539,7 +539,7 @@ func TestDone(t *testing.T) { }, { desc: "with multiple messages in a queue", - inProgress: map[string][]*base.TaskMessage{ + active: map[string][]*base.TaskMessage{ "default": {t1, t3}, "custom": {t2}, }, @@ -562,8 +562,8 @@ func TestDone(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllDeadlines(t, r.client, tc.deadlines) - h.SeedAllActiveQueues(t, r.client, tc.inProgress) - for _, msgs := range tc.inProgress { + h.SeedAllActiveQueues(t, r.client, tc.active) + for _, msgs := range tc.active { for _, msg := range msgs { // Set uniqueness lock if unique key is present. if len(msg.UniqueKey) > 0 {