Refactor redis keys and store messages in protobuf

Changes:
- Task messages are stored under "asynq:{<qname>}:t:<task_id>" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout".
- Redis LIST and ZSET stores task message IDs
- Task messages are serialized using protocol buffer
This commit is contained in:
Ken Hibino
2021-03-12 16:23:08 -08:00
parent 2516c4baba
commit 5f61566284
22 changed files with 2534 additions and 668 deletions

View File

@@ -5,7 +5,6 @@
package rdb
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -110,7 +109,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
@@ -135,7 +134,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
key := cast.ToString(data[i])
val := cast.ToInt(data[i+1])
switch key {
case base.QueueKey(qname):
case base.PendingKey(qname):
stats.Pending = val
size += val
case base.ActiveKey(qname):
@@ -312,7 +311,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.QueueKey(qname), pgn)
return r.listMessages(base.PendingKey(qname), qname, pgn)
}
// ListActive returns all tasks that are currently being processed for the given queue.
@@ -320,23 +319,42 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.ActiveKey(qname), pgn)
return r.listMessages(base.ActiveKey(qname), qname, pgn)
}
// KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending)
// ARGV[1] -> start offset
// ARGV[2] -> stop offset
// ARGV[3] -> task key prefix
var listMessagesCmd = redis.NewScript(`
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local res = {}
for _, id in ipairs(ids) do
local key = ARGV[3] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
`)
// listMessages returns a list of TaskMessage in Redis list with the given key.
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessage, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(key, start, stop).Result()
res, err := listMessagesCmd.Run(r.client,
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
reverse(data)
var msgs []*base.TaskMessage
for _, s := range data {
m, err := base.DecodeMessage(s)
m, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
@@ -352,7 +370,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.ScheduledKey(qname), pgn)
return r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
}
// ListRetry returns all tasks from the given queue that have failed before
@@ -361,7 +379,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.RetryKey(qname), pgn)
return r.listZSetEntries(base.RetryKey(qname), qname, pgn)
}
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
@@ -369,36 +387,63 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.ArchivedKey(qname), pgn)
return r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
}
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
// ARGV[1] -> min
// ARGV[2] -> max
// ARGV[3] -> task key prefix
//
// Returns an array populated with
// [msg1, score1, msg2, score2, ..., msgN, scoreN]
var listZSetEntriesCmd = redis.NewScript(`
local res = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
local key = ARGV[3] .. id_score_pairs[i]
table.insert(res, redis.call("HGET", key, "msg"))
table.insert(res, id_score_pairs[i+1])
end
return res
`)
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) {
res, err := listZSetEntriesCmd.Run(r.client, []string{key},
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, err
}
var res []base.Z
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
data, err := cast.ToSliceE(res)
if err != nil {
return nil, err
}
var zs []base.Z
for i := 0; i < len(data); i += 2 {
s, err := cast.ToStringE(data[i])
if err != nil {
return nil, err
}
msg, err := base.DecodeMessage(s)
score, err := cast.ToInt64E(data[i+1])
if err != nil {
return nil, err
}
msg, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
res = append(res, base.Z{Message: msg, Score: int64(z.Score)})
zs = append(zs, base.Z{Message: msg, Score: score})
}
return res, nil
return zs, nil
}
// RunArchivedTask finds an archived task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score))
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@@ -411,8 +456,8 @@ func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
// RunRetryTask finds a retry task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score))
func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@@ -425,8 +470,8 @@ func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
// RunScheduledTask finds a scheduled task that matches the given id and score from
// from the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score))
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@@ -439,35 +484,35 @@ func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
// RunAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ScheduledKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.ScheduledKey(qname), base.PendingKey(qname))
}
// RunAllRetryTasks enqueues all retry tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllRetryTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.RetryKey(qname), base.PendingKey(qname))
}
// RunAllArchivedTasks enqueues all archived tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
}
// KEYS[1] -> sorted set to remove the id from
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
var removeAndRunCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
return 1
end
local n = redis.call("ZREM", KEYS[1], ARGV[1])
if n == 0 then
return 0
end
return 0`)
redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
`)
func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, score, id).Result()
func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result()
if err != nil {
return 0, err
}
@@ -479,12 +524,12 @@ func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error)
}
var removeAndRunAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id)
end
return table.getn(msgs)`)
return table.getn(ids)`)
func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
res, err := removeAndRunAllCmd.Run(r.client, []string{zset, qkey}).Result()
@@ -498,10 +543,11 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
return n, nil
}
// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
// ArchiveRetryTask finds a retry task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String())
if err != nil {
return err
}
@@ -511,10 +557,11 @@ func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
return nil
}
// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
// ArchiveScheduledTask finds a scheduled task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String())
if err != nil {
return err
}
@@ -526,13 +573,12 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) erro
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> task message to archive
// ARGV[1] -> ID of the task to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archive (e.g., 100)
var archivePendingCmd = redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 1, ARGV[1])
if x == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
@@ -541,47 +587,33 @@ redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) archivePending(qname, msg string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{msg, now.Unix(), limit, maxArchiveSize}
res, err := archivePendingCmd.Run(r.client, keys, args...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// ArchivePendingTask finds a pending task that matches the given id from the given queue
// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
// ArchivePendingTask finds a pending task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
keys := []string{
base.PendingKey(qname),
base.ArchivedKey(qname),
}
now := time.Now()
argv := []interface{}{
id.String(),
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := archivePendingCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.archivePending(qname, s)
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("command error: unexpected return value %v", res)
}
return ErrTaskNotFound
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// ArchiveAllRetryTasks archives all retry tasks from the given queue and
@@ -596,66 +628,64 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) {
return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname))
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
var archiveAllPendingCmd = redis.NewScript(`
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZADD", KEYS[2], ARGV[1], msg)
local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("ZADD", KEYS[2], ARGV[1], id)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
return table.getn(ids)`)
// ArchiveAllPendingTasks archives all pending tasks from the given queue and
// returns the number of tasks that were moved.
// returns the number of tasks moved.
func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{now.Unix(), limit, maxArchiveSize}
res, err := archiveAllPendingCmd.Run(r.client, keys, args...).Result()
argv := []interface{}{
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> score of the task to archive
// ARGV[2] -> id of the task to archive
// ARGV[3] -> current timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archived state (e.g., 100)
// ARGV[1] -> id of the task to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archived state (e.g., 100)
var removeAndArchiveCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
redis.call("ZADD", KEYS[2], ARGV[3], msg)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
return 1
end
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
return 0
end
return 0`)
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) {
func (r *RDB) removeAndArchive(src, dst, id string) (int64, error) {
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
res, err := removeAndArchiveCmd.Run(r.client,
[]string{src, dst},
score, id, now.Unix(), limit, maxArchiveSize).Result()
id, now.Unix(), limit, maxArchiveSize).Result()
if err != nil {
return 0, err
}
@@ -666,108 +696,106 @@ func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error
return n, nil
}
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[1] -> ZSET to move task from (e.g., asynq:{<qname>}:retry)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
var removeAndArchiveAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZADD", KEYS[2], ARGV[1], msg)
redis.call("ZREM", KEYS[1], msg)
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("ZADD", KEYS[2], ARGV[1], id)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end
return table.getn(msgs)`)
redis.call("DEL", KEYS[1])
return table.getn(ids)`)
func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) {
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst},
now.Unix(), limit, maxArchiveSize).Result()
argv := []interface{}{
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := removeAndArchiveAllCmd.Run(r.client,
[]string{src, dst}, argv...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}
// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error {
return r.deleteTask(base.ArchivedKey(qname), id.String(), float64(score))
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID) error {
return r.deleteTask(base.ArchivedKey(qname), qname, id.String())
}
// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error {
return r.deleteTask(base.RetryKey(qname), id.String(), float64(score))
func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID) error {
return r.deleteTask(base.RetryKey(qname), qname, id.String())
}
// DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error {
return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score))
func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID) error {
return r.deleteTask(base.ScheduledKey(qname), qname, id.String())
}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> task ID
var deletePendingTaskCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return 0
end
return redis.call("DEL", KEYS[2])
`)
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.client.LRem(qkey, 1, s).Result()
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() {
if err := r.client.Del(msg.UniqueKey).Err(); err != nil {
return err
}
}
return nil
}
}
return ErrTaskNotFound
}
var deleteTaskCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then
redis.call("DEL", decoded["UniqueKey"])
end
return 1
end
end
return 0`)
func (r *RDB) deleteTask(key, id string, score float64) error {
res, err := deleteTaskCmd.Run(r.client, []string{key}, score, id).Result()
keys := []string{base.PendingKey(qname), base.TaskKey(qname, id.String())}
res, err := deletePendingTaskCmd.Run(r.client, keys, id.String()).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("could not cast %v to int64", res)
return fmt.Errorf("command error: unexpected return value %v", res)
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// KEYS[1] -> ZSET key to remove the task from (e.g. asynq:{<qname>}:retry)
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> task ID
var deleteTaskCmd = redis.NewScript(`
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
return 0
end
return redis.call("DEL", KEYS[2])
`)
func (r *RDB) deleteTask(key, qname, id string) error {
keys := []string{key, base.TaskKey(qname, id)}
argv := []interface{}{id}
res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("command error: unexpected return value %v", res)
}
if n == 0 {
return ErrTaskNotFound
@@ -776,37 +804,36 @@ func (r *RDB) deleteTask(key, id string, score float64) error {
}
// KEYS[1] -> queue to delete
// ARGV[1] -> task key prefix
var deleteAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
local key = ARGV[1] .. id
redis.call("DEL", key)
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
return table.getn(ids)`)
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) {
return r.deleteAll(base.ArchivedKey(qname))
return r.deleteAll(base.ArchivedKey(qname), qname)
}
// DeleteAllRetryTasks deletes all retry tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) {
return r.deleteAll(base.RetryKey(qname))
return r.deleteAll(base.RetryKey(qname), qname)
}
// DeleteAllScheduledTasks deletes all scheduled tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) {
return r.deleteAll(base.ScheduledKey(qname))
return r.deleteAll(base.ScheduledKey(qname), qname)
}
func (r *RDB) deleteAll(key string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}).Result()
func (r *RDB) deleteAll(key, qname string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return 0, err
}
@@ -817,28 +844,28 @@ func (r *RDB) deleteAll(key string) (int64, error) {
return n, nil
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// ARGV[1] -> task key prefix
var deleteAllPendingCmd = redis.NewScript(`
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
local key = ARGV[1] .. id
redis.call("DEL", key)
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
return table.getn(ids)`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
res, err := deleteAllPendingCmd.Run(r.client, []string{base.QueueKey(qname)}).Result()
res, err := deleteAllPendingCmd.Run(r.client,
[]string{base.PendingKey(qname)}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}
@@ -868,11 +895,27 @@ func (e *ErrQueueNotEmpty) Error() string {
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines
// ARGV[1] -> task key prefix
var removeQueueForceCmd = redis.NewScript(`
local active = redis.call("LLEN", KEYS[2])
if active > 0 then
return redis.error_reply("Queue has tasks active")
end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
@@ -882,22 +925,36 @@ redis.call("DEL", KEYS[6])
return redis.status_reply("OK")`)
// Checks whether queue is empty before removing.
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines
// ARGV[1] -> task key prefix
var removeQueueCmd = redis.NewScript(`
local pending = redis.call("LLEN", KEYS[1])
local active = redis.call("LLEN", KEYS[2])
local scheduled = redis.call("SCARD", KEYS[3])
local retry = redis.call("SCARD", KEYS[4])
local archived = redis.call("SCARD", KEYS[5])
local total = pending + active + scheduled + retry + archived
if total > 0 then
local ids = {}
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
table.insert(ids, id)
end
if table.getn(ids) > 0 then
return redis.error_reply("QUEUE NOT EMPTY")
end
for _, id in ipairs(ids) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
@@ -927,14 +984,14 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
script = removeQueueCmd
}
keys := []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
base.ArchivedKey(qname),
base.DeadlinesKey(qname),
}
if err := script.Run(r.client, keys).Err(); err != nil {
if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil {
if err.Error() == "QUEUE NOT EMPTY" {
return &ErrQueueNotEmpty{qname}
}
@@ -967,46 +1024,47 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
if err != nil {
continue // skip bad data
}
var info base.ServerInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
info, err := base.DecodeServerInfo([]byte(data))
if err != nil {
continue // skip bad data
}
servers = append(servers, &info)
servers = append(servers, info)
}
return servers, nil
}
// Note: Script also removes stale keys.
var listWorkerKeysCmd = redis.NewScript(`
var listWorkersCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)
local res = {}
for _, key in ipairs(keys) do
local vals = redis.call("HVALS", key)
for _, v in ipairs(vals) do
table.insert(res, v)
end
end
return res`)
// ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
now := time.Now()
res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil {
return nil, err
}
keys, err := cast.ToStringSliceE(res)
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var workers []*base.WorkerInfo
for _, key := range keys {
data, err := r.client.HVals(key).Result()
for _, s := range data {
w, err := base.DecodeWorkerInfo([]byte(s))
if err != nil {
continue // skip bad data
}
for _, s := range data {
var w base.WorkerInfo
if err := json.Unmarshal([]byte(s), &w); err != nil {
continue // skip bad data
}
workers = append(workers, &w)
}
workers = append(workers, w)
}
return workers, nil
}
@@ -1036,11 +1094,11 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
continue // skip bad data
}
for _, s := range data {
var e base.SchedulerEntry
if err := json.Unmarshal([]byte(s), &e); err != nil {
e, err := base.DecodeSchedulerEntry([]byte(s))
if err != nil {
continue // skip bad data
}
entries = append(entries, &e)
entries = append(entries, e)
}
}
return entries, nil
@@ -1059,11 +1117,11 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas
if err != nil {
return nil, err
}
var e base.SchedulerEnqueueEvent
if err := json.Unmarshal([]byte(data), &e); err != nil {
e, err := base.DecodeSchedulerEnqueueEvent([]byte(data))
if err != nil {
return nil, err
}
events = append(events, &e)
events = append(events, e)
}
return events, nil
}
@@ -1096,7 +1154,7 @@ func (r *RDB) Unpause(qname string) error {
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (r *RDB) ClusterKeySlot(qname string) (int64, error) {
key := base.QueueKey(qname)
key := base.PendingKey(qname)
return r.client.ClusterKeySlot(key).Result()
}