mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Delete stale code
This commit is contained in:
parent
28d698c24e
commit
2af9eb2c88
@ -77,26 +77,12 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Note: LREM count ZERO means "remove all elements equal to val"
|
// Note: LREM count ZERO means "remove all elements equal to val"
|
||||||
// Note: Script will try removing the message by exact match first,
|
|
||||||
// if the task is mutated and exact match is not found, it'll fallback
|
|
||||||
// to finding a match with ID.
|
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
|
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
|
||||||
// ARGV[1] -> base.TaskMessage value
|
// ARGV[1] -> base.TaskMessage value
|
||||||
// ARGV[2] -> stats expiration timestamp
|
// ARGV[2] -> stats expiration timestamp
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
if tonumber(x) == 0 then
|
|
||||||
local target = cjson.decode(ARGV[1])
|
|
||||||
local data = redis.call("LRANGE", KEYS[1], 0, -1)
|
|
||||||
for _, s in ipairs(data) do
|
|
||||||
local msg = cjson.decode(s)
|
|
||||||
if target["ID"] == msg["ID"] then
|
|
||||||
redis.call("LREM", KEYS[1], 0, s)
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
local n = redis.call("INCR", KEYS[2])
|
local n = redis.call("INCR", KEYS[2])
|
||||||
if tonumber(n) == 1 then
|
if tonumber(n) == 1 then
|
||||||
redis.call("EXPIREAT", KEYS[2], ARGV[2])
|
redis.call("EXPIREAT", KEYS[2], ARGV[2])
|
||||||
@ -157,9 +143,6 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Note: Script will try removing the message by exact match first,
|
|
||||||
// if the task is mutated and exact match is not found, it'll fallback
|
|
||||||
// to finding a match with ID.
|
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:retry
|
// KEYS[2] -> asynq:retry
|
||||||
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
||||||
@ -169,18 +152,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
// ARGV[3] -> retry_at UNIX timestamp
|
// ARGV[3] -> retry_at UNIX timestamp
|
||||||
// ARGV[4] -> stats expiration timestamp
|
// ARGV[4] -> stats expiration timestamp
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
if tonumber(x) == 0 then
|
|
||||||
local target = cjson.decode(ARGV[1])
|
|
||||||
local data = redis.call("LRANGE", KEYS[1], 0, -1)
|
|
||||||
for _, s in ipairs(data) do
|
|
||||||
local msg = cjson.decode(s)
|
|
||||||
if target["ID"] == msg["ID"] then
|
|
||||||
redis.call("LREM", KEYS[1], 0, s)
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||||
local n = redis.call("INCR", KEYS[3])
|
local n = redis.call("INCR", KEYS[3])
|
||||||
if tonumber(n) == 1 then
|
if tonumber(n) == 1 then
|
||||||
@ -225,9 +197,6 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
|||||||
processedKey := base.ProcessedKey(now)
|
processedKey := base.ProcessedKey(now)
|
||||||
failureKey := base.FailureKey(now)
|
failureKey := base.FailureKey(now)
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
// Note: Script will try removing the message by exact match first,
|
|
||||||
// if the task is mutated and exact match is not found, it'll fallback
|
|
||||||
// to finding a match with ID.
|
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:dead
|
// KEYS[2] -> asynq:dead
|
||||||
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
||||||
@ -239,18 +208,7 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
|||||||
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
||||||
// ARGV[6] -> stats expiration timestamp
|
// ARGV[6] -> stats expiration timestamp
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||||
if tonumber(x) == 0 then
|
|
||||||
local target = cjson.decode(ARGV[1])
|
|
||||||
local data = redis.call("LRANGE", KEYS[1], 0, -1)
|
|
||||||
for _, s in ipairs(data) do
|
|
||||||
local msg = cjson.decode(s)
|
|
||||||
if target["ID"] == msg["ID"] then
|
|
||||||
redis.call("LREM", KEYS[1], 0, s)
|
|
||||||
break
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
||||||
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
||||||
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
||||||
|
Loading…
Reference in New Issue
Block a user