2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Add KillRetryTask, KillScheduledTask methods to RDB

This commit is contained in:
Ken Hibino
2019-12-26 07:17:26 -08:00
parent d222dfd749
commit 5b98b8eb62
5 changed files with 262 additions and 18 deletions

View File

@@ -25,7 +25,6 @@ type Stats struct {
}
// EnqueuedTask is a task in a queue and is ready to be processed.
// Note: This is read only and used for monitoring purpose.
type EnqueuedTask struct {
ID xid.ID
Type string
@@ -33,7 +32,6 @@ type EnqueuedTask struct {
}
// InProgressTask is a task that's currently being processed.
// Note: This is read only and used for monitoring purpose.
type InProgressTask struct {
ID xid.ID
Type string
@@ -41,7 +39,6 @@ type InProgressTask struct {
}
// ScheduledTask is a task that's scheduled to be processed in the future.
// Note: This is read only and used for monitoring purpose.
type ScheduledTask struct {
ID xid.ID
Type string
@@ -51,7 +48,6 @@ type ScheduledTask struct {
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
// Note: This is read only and used for monitoring purpose.
type RetryTask struct {
ID xid.ID
Type string
@@ -65,7 +61,6 @@ type RetryTask struct {
}
// DeadTask is a task in that has exhausted all retries.
// Note: This is read only and used for monitoring purpose.
type DeadTask struct {
ID xid.ID
Type string
@@ -391,6 +386,71 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
return n, nil
}
// KillRetryTask finds a task that matches the given id and score from retry queue
// and moves it to dead queue. If a task that maches the id and score does not exist,
// it returns ErrTaskNotFound.
func (r *RDB) KillRetryTask(id xid.ID, score int64) error {
n, err := r.removeAndKill(base.RetryQueue, id.String(), float64(score))
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// KillScheduledTask finds a task that matches the given id and score from scheduled queue
// and moves it to dead queue. If a task that maches the id and score does not exist,
// it returns ErrTaskNotFound.
func (r *RDB) KillScheduledTask(id xid.ID, score int64) error {
n, err := r.removeAndKill(base.ScheduledQueue, id.String(), float64(score))
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) {
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[2] -> asynq:dead
// ARGV[1] -> score of the task to kill
// ARGV[2] -> id of the task to kill
// ARGV[3] -> current timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
redis.call("ZADD", KEYS[2], ARGV[3], msg)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
return 1
end
end
return 0
`)
now := time.Now()
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
res, err := script.Run(r.client,
[]string{zset, base.DeadQueue},
score, id, now.Unix(), limit, maxDeadTasks).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// DeleteDeadTask finds a task that matches the given id and score from dead queue
// and deletes it. If a task that matches the id and score does not exist,
// it returns ErrTaskNotFound.