diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index a5a3aef..35df06d 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -77,26 +77,12 @@ func (r *RDB) Done(msg *base.TaskMessage) error { return err } // Note: LREM count ZERO means "remove all elements equal to val" - // Note: Script will try removing the message by exact match first, - // if the task is mutated and exact match is not found, it'll fallback - // to finding a match with ID. // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:processed: // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp script := redis.NewScript(` - local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) - if tonumber(x) == 0 then - local target = cjson.decode(ARGV[1]) - local data = redis.call("LRANGE", KEYS[1], 0, -1) - for _, s in ipairs(data) do - local msg = cjson.decode(s) - if target["ID"] == msg["ID"] then - redis.call("LREM", KEYS[1], 0, s) - break - end - end - end + redis.call("LREM", KEYS[1], 0, ARGV[1]) local n = redis.call("INCR", KEYS[2]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[2], ARGV[2]) @@ -157,9 +143,6 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e if err != nil { return err } - // Note: Script will try removing the message by exact match first, - // if the task is mutated and exact match is not found, it'll fallback - // to finding a match with ID. // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:retry // KEYS[3] -> asynq:processed: @@ -169,18 +152,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp script := redis.NewScript(` - local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) - if tonumber(x) == 0 then - local target = cjson.decode(ARGV[1]) - local data = redis.call("LRANGE", KEYS[1], 0, -1) - for _, s in ipairs(data) do - local msg = cjson.decode(s) - if target["ID"] == msg["ID"] then - redis.call("LREM", KEYS[1], 0, s) - break - end - end - end + redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) local n = redis.call("INCR", KEYS[3]) if tonumber(n) == 1 then @@ -225,9 +197,6 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { processedKey := base.ProcessedKey(now) failureKey := base.FailureKey(now) expireAt := now.Add(statsTTL) - // Note: Script will try removing the message by exact match first, - // if the task is mutated and exact match is not found, it'll fallback - // to finding a match with ID. // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:dead // KEYS[3] -> asynq:processed: @@ -239,18 +208,7 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { // ARGV[5] -> max number of tasks in dead queue (e.g., 100) // ARGV[6] -> stats expiration timestamp script := redis.NewScript(` - local x = redis.call("LREM", KEYS[1], 0, ARGV[1]) - if tonumber(x) == 0 then - local target = cjson.decode(ARGV[1]) - local data = redis.call("LRANGE", KEYS[1], 0, -1) - for _, s in ipairs(data) do - local msg = cjson.decode(s) - if target["ID"] == msg["ID"] then - redis.call("LREM", KEYS[1], 0, s) - break - end - end - end + redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])